package com.data.dev.flink.mailTopic.main;

import com.data.dev.common.javabean.BaseBean;
import com.data.dev.common.javabean.kafkaMailTopic.MailMsgAlarm;
import com.data.dev.elasticsearch.ElasticSearchInfo;
import com.data.dev.elasticsearch.SinkToEs;
import com.data.dev.flink.FlinkEnv;
import com.data.dev.flink.mailTopic.OperationForLoginFailCheck.*;
import com.data.dev.kafka.KafkaSourceBuilder;
import com.data.dev.key.ConfigurationKey;
import com.data.dev.utils.TimeUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

import java.time.Duration;


/**
 * Flink处理在3分钟内连续登录失败20次后登录成功的场景
 * 采用滑动窗口来实现
 * @author wangxiaoming-ghq 2022-06-01
 */

@Slf4j
public class MailMsg extends BaseBean {

    /**
     * Flink作业名称
     */
    public static final  String JOB_NAME = "告警采集平台——连续登录失败后登录成功告警";


    public MailMsg(){
        log.info("初始化滑动窗口场景告警程序");
    }

    /**
     * 执行逻辑统计场景，实现告警推送
     */
    public static void execute(){


        //① 创建Flink执行环境并设置checkpoint等必要的参数
        StreamExecutionEnvironment env = FlinkEnv.getFlinkEnv();
        KafkaSource<String> kafkaSource = KafkaSourceBuilder.getKafkaSource(ConfigurationKey.KAFKA_MAIL_TOPIC_NAME,ConfigurationKey.KAFKA_MAIL_CONSUMER_GROUP_ID) ;
        DataStreamSource<String> kafkaMailMsg = env.fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMillis(10)), "Kafka Source for AlarmPlatform About Mail Topic");


        //② 筛选登录消息，创建初始登录事件流
        SingleOutputStreamOperator<com.data.dev.common.javabean.kafkaMailTopic.MailMsg> loginMapDs = kafkaMailMsg.map(new MsgToBeanMapper()).name("Map算子加工流");
        SingleOutputStreamOperator<com.data.dev.common.javabean.kafkaMailTopic.MailMsg> loginFilterDs = loginMapDs.filter(new MailMsgForLoginFilter()).name("Filter算子加工流");

        //③ 设置水位线
        WatermarkStrategy<com.data.dev.common.javabean.kafkaMailTopic.MailMsg> watermarkStrategy = WatermarkStrategy.<com.data.dev.common.javabean.kafkaMailTopic.MailMsg>forBoundedOutOfOrderness(Duration.ofMinutes(1))
                        .withTimestampAssigner((mailMsg, timestamp) -> TimeUtils.switchUTCToBeijingTimestamp(mailMsg.getTimestamp_datetime()));
        SingleOutputStreamOperator<com.data.dev.common.javabean.kafkaMailTopic.MailMsg> loginWmDs = loginFilterDs.assignTimestampsAndWatermarks(watermarkStrategy.withIdleness(Duration.ofMinutes(3))).name("增加水位线");

        //④ 设置主键
        KeyedStream<com.data.dev.common.javabean.kafkaMailTopic.MailMsg, String> loginKeyedDs = loginWmDs.keyBy(new LoginKeySelector());

        //⑥ 转化为滑动窗口
        WindowedStream<com.data.dev.common.javabean.kafkaMailTopic.MailMsg, String, TimeWindow> loginWindowDs = loginKeyedDs.window(SlidingEventTimeWindows.of(Time.seconds(180L),Time.seconds(90L)));

        //⑦ 在窗口内进行逻辑统计
        SingleOutputStreamOperator<MailMsgAlarm> loginWindowsDealDs  = loginWindowDs.process(new WindowProcessFuncImpl()).name("窗口处理逻辑");

        //⑧ 将结果转化为通用DataStream<String>格式
        SingleOutputStreamOperator<String> resultDs  = loginWindowsDealDs.map(new AlarmMsgToStringMapper()).name("窗口结果转化为标准格式");

        //⑨ 将最终结果写入ES
        resultDs.addSink(SinkToEs.getEsSinkBuilder(ElasticSearchInfo.ES_LOGIN_FAIL_INDEX_NAME,ElasticSearchInfo.ES_INDEX_TYPE_DEFAULT).build());

        //⑩ 提交Flink集群进行执行
        FlinkEnv.envExec(env,JOB_NAME);

    }
}
